package cn.allms.boot.mq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

/**
 * MQ消费者
 *
 * @author xieya
 * @date 2021/8/27
 */
public class ConsumerServer {
    public static void getMqMsg() throws Exception {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("allms_mq");

        // 设置NameServer的地址
        consumer.setNamesrvAddr("159.75.102.95:9876");

        // 订阅一个或者多个Topic，以及Tag来过滤需要消费的消息
        consumer.subscribe("AllmsTopicTest", "*");
        // 拉取模式 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
            msg.forEach(msgItem ->
                    System.out.printf("%s 收到新消息: %s %n", Thread.currentThread().getName(),
                            new String(msgItem.getBody())
                    )
            );


            // 标记该消息已经被成功消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

    public static void main(String[] args) throws Exception {
        // 消费消息 拉取模式
        getMqMsg();
    }
}
